package com.hanxiaozhang.example.controller;

import com.alibaba.fastjson.JSON;
import com.hanxiaozhang.constant.RocketConstant;
import com.hanxiaozhang.example.entity.RocketMessage;
import com.hanxiaozhang.example.listener.mgsconsumer.No5PullMgsOriginalSyntax;
import com.hanxiaozhang.example.listener.mgsconsumer.No7LitePullSubscribeMsgOriginalSyntax;
import com.hanxiaozhang.example.listener.mgsconsumer.No8LitePullAssignMsgOriginalSyntax;
import com.hanxiaozhang.example.listener.mgssend.No6LocalTransactionOriginalSyntaxListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import java.time.LocalDateTime;
import java.util.*;


/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/26
 * @since 1.0.0
 */
@Slf4j
@RequestMapping("/")
@Controller
public class RocketController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Autowired
    private No5PullMgsOriginalSyntax pullMgsOriginal;
    @Autowired
    private No7LitePullSubscribeMsgOriginalSyntax litePullSubscribeMsg;
    @Autowired
    private No8LitePullAssignMsgOriginalSyntax litePullAssignMsg;

    /*
    目录：
    一、生产者：
    1.普通消息发送：同步、异步、单向
    2.顺序消息发送：普通顺序、严格顺序
    3.延迟消息发送
    4.批量消息发送
    5.事务消息发送
    6.带Tag消息发送

    二、消费者：
    1.MessageModel(消息模型)：集群消费与广播消费
    2.ConsumeMode(消费模型)：并发消费与顺序消费
    3.消息过滤：Tag过滤与SQL92过滤
    4.消息重试与死信队列
    5.消息应答
    6.Pull消费（原始Pull Consumer与Lite Pull Consumer）
    7.设置消费点位
    8.消费者手动应答

     */

    // 一、生产者：
    // ----- ----- 1.普通消息发送 ----- -----

    /**
     * 发送普通同步消息
     *
     * @return
     */
    @GetMapping("/sendMqBySync")
    @ResponseBody
    public Object sendMqBySync() {
        RocketMessage message = RocketMessage.builder().name("普通同步消息" + LocalDateTime.now()).build();
        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);
        return sendResult;
    }

    /**
     * 发送异步消息
     *
     * @return
     */
    @GetMapping("/sendMqByAsync")
    @ResponseBody
    public Object sendMqByAsync() {
        RocketMessage message = RocketMessage.builder().name("异步消息" + LocalDateTime.now()).build();
        // asyncSend
        rocketMQTemplate.asyncSend(RocketConstant.ASYNC_TOPIC, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 处理相应的业务
                log.info("发送成功:{}", JSON.toJSONString(sendResult));
            }

            @Override
            public void onException(Throwable throwable) {
                // 处理相应的业务
                log.info("发送异常:{}", throwable);
            }
        });
        return null;
    }


    /**
     * 发送单向消息
     * <p>
     * 这种方式主要用在不特别关心发送结果的场景，例如日志发送
     *
     * @return
     */
    @GetMapping("/sendMqByOneWay")
    @ResponseBody
    public Object sendMqByOneWay() {
        RocketMessage message = RocketMessage.builder().name("单向消息" + LocalDateTime.now()).build();
        // sendOneWay
        rocketMQTemplate.sendOneWay(RocketConstant.ONE_WAY_TOPIC, message);
        return null;
    }


    // ----- ----- 2.顺序消息发送 ----- -----

    /**
     * 发送普通顺序消息
     *
     * @return
     */
    @GetMapping("/sendMqByOrder")
    @ResponseBody
    public Object sendMqByOrder() {

        List<SendResult> results = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            RocketMessage message = RocketMessage.builder().name("普通顺序消息" + LocalDateTime.now() + i).build();
            // syncSendOrderly
            SendResult sendResult = rocketMQTemplate.syncSendOrderly(RocketConstant.COMMON_TOPIC, message, "hashkey");
            results.add(sendResult);
        }
        return results;
    }


    /**
     * 发送严格顺序消息
     * <p>
     * 概念：
     * 顺序消息是一种对消息发送和消费顺序有严格要求的消息
     * <p>
     * 生产顺序性：
     * RocketMQ通过生产者和服务端的协议保障单个生产者串行地发送消息，并按序存储和持久化。如需保证消息生产的顺序性，则必须满足以下条件：
     * 单一生产者： 消息生产的顺序性仅支持单一生产者，不同生产者分布在不同的系统，即使设置相同的分区键，不同生产者之间产生的消息也无法判定其先后顺序。
     * 串行发送：生产者客户端支持多线程安全访问，但如果生产者使用多线程并行发送，则不同线程间产生的消息将无法判定其先后顺序。
     *
     * @return
     */
    @GetMapping("/sendMqByStrictOrder")
    @ResponseBody
    public Object sendMqByStrictOrder() {
        List<SendResult> results = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            RocketMessage message = RocketMessage.builder().name("严格顺序消息" + LocalDateTime.now() + i).build();
            // syncSendOrderly
            SendResult sendResult = rocketMQTemplate.syncSendOrderly(RocketConstant.STRICT_ORDER_TOPIC, message, "hashkey");
            results.add(sendResult);
        }
        return results;
    }


    // ----- ----- 3.延迟消息发送 ----- -----

    /**
     * 发送延时消息
     * <p>
     * 延时消息的使用限制
     * private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
     * 现在RocketMq并不支持任意时间的延时，需要设置几个固定的延时等级，从1s到2h分别对应着等级1到18，
     * 消息消费失败会进入延时消息队列
     *
     * @return
     */
    @GetMapping("/sendMqByDelay")
    @ResponseBody
    public Object sendMqByDelay() {
        RocketMessage message = RocketMessage.builder().name("延时消息" + LocalDateTime.now()).build();
        // syncSend(... int delayLevel)
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.DELAY_TOPIC, MessageBuilder.withPayload(message).build(), 2000, 4);
        return sendResult;
    }


    // ----- ----- 4.批量消息发送 ----- -----

    /**
     * 批量发送消息
     * <p>
     * 在对吞吐率有一定要求的情况下，RocketMQ可以将一些消息聚成一批以后进行发送，
     * 可以增加吞吐率，并减少API和网络调用次数
     *
     * @return
     */
    @GetMapping("/sendMqByBatch")
    @ResponseBody
    public Object sendMqByBatch() {

        List<Message> messageList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            RocketMessage message = RocketMessage.builder().name("批量消息" + LocalDateTime.now() + i).build();
            messageList.add(MessageBuilder.withPayload(message).build());
        }
        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, messageList);
        return sendResult;
    }


    // ----- ----- 5.事务消息发送 ----- -----

    /**
     * 发送事务消息（半消息）
     * <p>
     * 仅仅只是保证本地事务和MQ消息发送形成整体的原子性，而投递到MQ服务器后，
     * 并无法保证消费者一定能消费成功
     * <p>
     *
     * @return
     */
    @GetMapping("/sendMqByTx")
    @ResponseBody
    public Object sendMqByTx(Integer type, Integer msgKey) {
        String transactionId = UUID.randomUUID().toString();
        No6LocalTransactionOriginalSyntaxListener transactionListener = new No6LocalTransactionOriginalSyntaxListener();
        TransactionMQProducer producer = new TransactionMQProducer(RocketConstant.TX_PRODUCER_GROUP);
        try {
            producer.setTransactionListener(transactionListener);
            producer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);
            producer.start();
            log.info("transactionId is {}", transactionId);
            org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message(RocketConstant.TX_TOPIC,
                    ("事务消息" + LocalDateTime.now()).getBytes(RemotingHelper.DEFAULT_CHARSET));
            msg.getProperties().put(RocketMQHeaders.TRANSACTION_ID, transactionId);
            msg.getProperties().put("type", String.valueOf(type));
            msg.getProperties().put("msgKey", String.valueOf(msgKey));
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            return sendResult;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }

        return null;
    }


    // ----- ----- 6.带Tag消息发送 ----- -----

    /**
     * 发送带Tag消息
     * <p>
     * Tag（标签）可以看作子主题，它是消息的第二级类型
     * 通过 RocketMQTemplate发送带Tag的消息，只需要将topic和tag中间通过【:】冒号连接即可
     *
     * @return
     */
    @GetMapping("/sendMqWithTag")
    @ResponseBody
    public Object sendMqWithTag() {
        RocketMessage message = RocketMessage.builder().name("tag消息" + LocalDateTime.now()).build();
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.TAG_TOPIC + ":" + RocketConstant.TAG_EXPRESSION, message);
        return sendResult;
    }


    // 二、消费者：
    // ----- ----- 1.MessageModel(消息模型)：集群消费与广播消费 ----- -----

    /**
     * 发送集群或广播消息
     * <p>
     * 广播消费模式下，相同Consumer Group的每个Consumer实例都接收全量的消息。
     *
     * @param MessageModel 0:CLUSTERING   1:BROADCASTING
     * @return
     */
    @GetMapping("/sendMqByMessageModel")
    @ResponseBody
    public Object sendMqByMessageModel(@RequestParam Integer MessageModel) {

        List<SendResult> results = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            if (MessageModel == 0) {
                RocketMessage message = RocketMessage.builder().name("消息模型-集群消费" + LocalDateTime.now() + i).build();
                SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);
                results.add(sendResult);
            } else {
                RocketMessage message = RocketMessage.builder().name("消息模型-广播消费" + LocalDateTime.now() + i).build();
                SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.BROADCAST_TOPIC, message);
                results.add(sendResult);
            }
        }
        return results;
    }

    // ----- ----- 2.ConsumeMode(消费模型)：并发消费与顺序消费 ----- -----

    /**
     * 并发消费与顺序消费
     *
     * @param consumeMode 0:CONCURRENTLY  1:ORDERLY
     * @return
     */
    @GetMapping("/sendMqByConsumeMode")
    @ResponseBody
    public Object sendMqByConsumeMode(Integer consumeMode) {
        List<SendResult> results = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            if (consumeMode == 0) {
                RocketMessage message = RocketMessage.builder().name("消费模型-并发消费" + LocalDateTime.now() + i).build();
                SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);
                results.add(sendResult);
            } else {
                RocketMessage message = RocketMessage.builder().name("消费模型-顺序消费" + LocalDateTime.now() + i).build();
                SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.STRICT_ORDER_TOPIC, message);
                results.add(sendResult);
            }
        }
        return results;
    }


    // ----- ----- 3.消息过滤：Tag过滤与SQL92过滤 ----- -----

    /**
     * Tag过滤与SQL92过滤
     * <p>
     * Tag过滤：
     * 消费者订阅的Tag和发送者设置的消息Tag相互匹配，则消息被投递给消费端进行消费。
     * SQL92过滤：
     * 发送者设置Tag或自定义消息属性，消费者订阅满足SQL92过滤表达式的消息被投递给消费端进行消费。
     * 开启对SQL语法的支持（broker.conf）：
     * enablePropertyFilter = true
     *
     * @param filterMode 0:Tag过滤、1:SQL92过滤Tag、2:SQL92过滤自定义消息属性
     * @return
     */
    @GetMapping("/sendMqByFilterMode")
    @ResponseBody
    public Object sendMqByFilterMode(Integer filterMode) {
        if (filterMode == 0) {
            RocketMessage message = RocketMessage.builder().name("消费过滤-Tag过滤" + LocalDateTime.now()).build();
            SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.TAG_TOPIC + ":" + RocketConstant.TAG_EXPRESSION, message);
            return sendResult;
        } else if (filterMode == 1) {
            RocketMessage message = RocketMessage.builder().name("消费过滤-SQL92过滤Tag" + LocalDateTime.now()).build();
            SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.SQL92_TOPIC + ":" + RocketConstant.SQL92_TAG_EXPRESSION, message);
            return sendResult;
        } else {
            // Message msg = new Message("topic", "tagA", "Hello MQ".getBytes())
            // 设置自定义属性A，属性值为1。-> msg.putUserProperties("a", "1")
            //  RocketMQTemplate 目前好像不支持这种写法
            RocketMessage message = RocketMessage.builder().name("消费过滤-SQL92过滤自定义消息属性" + LocalDateTime.now()).build();
            Map<String, Object> map = new HashMap<>();
            map.put("a", 1);
            MessageHeaders messageHeaders = new MessageHeaders(map);
            SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.SQL92_PROPERTIES_TOPIC, MessageBuilder.createMessage(message, messageHeaders));
            return sendResult;
        }
    }


    // ----- ----- 4.消息重试与死信队列 ----- -----

    /**
     * 消息重试与死信队列
     * <p>
     * 1. 消息重试只针对集群消费模式生效；广播消费模式不提供失败重试特性，即消费失败后，
     * 失败消息不再重试，继续消费新的消息。
     * 2. 一条消息初次消费失败后，会自动进行消息重试，达到最大重试次数后，将其发送到该消费者对应的死信队列，
     * 这类消息称为死信消息（Dead-Letter Message）。死信队列是死信Topic下，分区数唯一的单独队列。
     * 3. 如果产生了死信消息，那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName，
     * 死信队列的消息将不会再被消费。
     * 4. 可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。
     *
     * @return
     */
    @GetMapping("/sendMqByRetry")
    @ResponseBody
    public Object sendMqByRetry() {
        RocketMessage message = RocketMessage.builder().name("消息重试" + LocalDateTime.now()).build();
        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.RETRY_TOPIC, message);
        return sendResult;
    }


    // ----- ----- 5.消息应答 ----- -----

    /**
     * 消息应答
     * Request-Replay机制，可以模拟RPC
     *
     * @return
     */
    @GetMapping("/sendByReply")
    @ResponseBody
    public Object sendByReply() {
        RocketMessage message = RocketMessage.builder().name("消息应答" + LocalDateTime.now()).build();
        RocketMessage receiveMessage = rocketMQTemplate.sendAndReceive(RocketConstant.REPLY_TOPIC, message, RocketMessage.class);
        return receiveMessage;
    }




    // ----- ----- 6.Pull消费（原始Pull Consumer与Lite Pull Consumer） ----- -----
    /*
    在RocketMQ中有两种Pull方式：
    比较原始Pull Consumer：它不提供相关的订阅方法，需要调用pull方法时指定队列进行拉取，并需要自己更新位点。
    Lite Pull Consumer（RocketMQ 4.6.0推出的）：它提供了Subscribe和Assign两种方式，使用起来更加方便。
     */

    /**
     * 原始Pull Consumer的消息发送
     *
     * @return
     */
    @GetMapping("/sendByOriginalPull")
    @ResponseBody
    public Object sendByOriginalPull() {

        List<Message> messageList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            RocketMessage message = RocketMessage.builder().name("原始pull消息" + LocalDateTime.now() + i).build();
            messageList.add(MessageBuilder.withPayload(message).build());
        }

        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.PULL_ORIGINAL_TOPIC, messageList);
        return sendResult;
    }

    @GetMapping("/pullByOriginal")
    @ResponseBody
    public void pullByOriginal() {

        pullMgsOriginal.pull(0, 2);
    }

    /**
     * 使用rocketMQTemplate拉取消息
     *
     * @return
     */
    @GetMapping("/sendByTemplatePull")
    @ResponseBody
    public Object sendByTemplatePull() {

        List<Message> messageList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            RocketMessage message = RocketMessage.builder().name("template pull消息" + LocalDateTime.now() + i).build();
            messageList.add(MessageBuilder.withPayload(message).build());
        }
        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_TEMPLATE_TOPIC, messageList);
        return sendResult;
    }


    /**
     * LitePullSubscribe
     *
     * @return
     */
    @GetMapping("/sendBySubscribePull")
    @ResponseBody
    public Object sendBySubscribePull() {

        List<Message> messageList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            RocketMessage message = RocketMessage.builder().name("subscribe pull消息" + LocalDateTime.now() + i).build();
            messageList.add(MessageBuilder.withPayload(message).build());
        }
        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_SUBSCRIBE_TOPIC, messageList);
        return sendResult;
    }


    @GetMapping("/pullBySubscribe")
    @ResponseBody
    public void pullBySubscribe() {

        litePullSubscribeMsg.pull(20);
    }

    /**
     * LitePullAssign
     *
     * @return
     */
    @GetMapping("/sendByAssignPull")
    @ResponseBody
    public Object sendByAssignPull() {

        List<Message> messageList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            RocketMessage message = RocketMessage.builder().name("assign pull消息" + LocalDateTime.now() + i).build();
            messageList.add(MessageBuilder.withPayload(message).build());
        }
        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_ASSIGN_TOPIC, messageList);
        return sendResult;
    }


    @GetMapping("/pullByAssign")
    @ResponseBody
    public void pullByAssign() {

        litePullAssignMsg.pull();
    }


    // ----- ----- 7.设置消费点位 ----- -----

    /**
     * 消费点类型设置成：
     * ConsumeFromWhere.CONSUME_FROM_TIMESTAMP
     *
     * @return
     */
    @GetMapping("/sendMqByConsumePoint")
    @ResponseBody
    public Object sendMqByConsumePoint() {
        RocketMessage message = RocketMessage.builder().name("设置消费点位" + LocalDateTime.now()).build();
        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.CONSUME_POINT_TOPIC, message);
        return sendResult;
    }


    // ----- ----- 8.消费者手动应答 ----- -----

    /**
     * @param mgs 0:会抛空指针，重试三次。
     * @return
     */
    @GetMapping("/sendMqByManualConfirm")
    @ResponseBody
    public Object sendMqByManualConfirm(Integer mgs) {
        RocketMessage message = RocketMessage.builder().name(mgs == null ? "消费者手动应答" + LocalDateTime.now() : String.valueOf(mgs)).build();
        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.MANUAL_CONFIRM_TOPIC, message);
        return sendResult;
    }


}
